/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.runtime.scheduler.adaptive;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.InternalMiniClusterExtension;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.time.Duration;

import static org.apache.flink.runtime.util.JobVertexConnectionUtils.connectNewDataSetAsInput;
import static org.assertj.core.api.Assertions.assertThat;

/** SlotSharing tests for the adaptive scheduler. */
class AdaptiveSchedulerSlotSharingITCase {

    private static final int NUMBER_TASK_MANAGERS = 1;
    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 1;
    private static final int PARALLELISM = 1;

    private static Configuration getConfiguration() {
        final Configuration configuration = new Configuration();

        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);

        // The test failed occasionally due to a race condition between the task being
        // freed by the TaskManager (calling TaskSlotTableImpl.freeSlotInternal(..) through
        // JobMaster.onStop()) and the ResourceManager cleaning up the job's requirements
        // (JobMaster.dissolveResourceManagerConnection(..) through JobMaster.onStop()).
        //
        // The DefaultDeclarativeSlotPool triggers a new slot request for the finished job
        // if the TaskManager freed the slot before the requirements were cleaned up by the
        // ResourceManager. The AdaptiveScheduler's resource timeout and the TaskManager's
        // slot timeout are competing in this case since both timeouts set to 10s by default.
        // The second job would fail if the resource times out before the additional slot request
        // was handled. Hence, we lower the slot timeout to work around this special case.
        configuration.set(TaskManagerOptions.SLOT_TIMEOUT, Duration.ofSeconds(5));

        return configuration;
    }

    @RegisterExtension
    private static final InternalMiniClusterExtension INTERNAL_MINI_CLUSTER_EXTENSION =
            new InternalMiniClusterExtension(
                    new MiniClusterResourceConfiguration.Builder()
                            .setConfiguration(getConfiguration())
                            .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
                            .setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
                            .build());

    @Test
    void testSchedulingOfJobRequiringSlotSharing() throws Exception {
        // run job multiple times to ensure slots are cleaned up properly
        runJob();
        runJob();
    }

    private void runJob() throws Exception {
        final MiniCluster miniCluster = INTERNAL_MINI_CLUSTER_EXTENSION.getMiniCluster();
        final JobGraph jobGraph = createJobGraphWithSlotSharingGroup();

        miniCluster.submitJob(jobGraph).join();

        final JobResult jobResult = miniCluster.requestJobResult(jobGraph.getJobID()).join();

        // this throws an exception if the job failed
        jobResult.toJobExecutionResult(getClass().getClassLoader());

        assertThat(jobResult.isSuccess()).isTrue();
    }

    /**
     * Returns a JobGraph that requires slot sharing to work in order to be able to run with a
     * single slot.
     */
    private static JobGraph createJobGraphWithSlotSharingGroup() {
        final SlotSharingGroup slotSharingGroup = new SlotSharingGroup();

        final JobVertex source = new JobVertex("Source");
        source.setInvokableClass(NoOpInvokable.class);
        source.setParallelism(PARALLELISM);
        source.setSlotSharingGroup(slotSharingGroup);

        final JobVertex sink = new JobVertex("sink");
        sink.setInvokableClass(NoOpInvokable.class);
        sink.setParallelism(PARALLELISM);
        sink.setSlotSharingGroup(slotSharingGroup);

        connectNewDataSetAsInput(
                sink, source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);

        return JobGraphTestUtils.streamingJobGraph(source, sink);
    }
}
